package com.zhang.second.day03;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

/**
 * @title: 自定义并行数据源
 * @author: zhang
 * @date: 2022/1/30 13:35
 */
public class Example4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        env
                .addSource(new ParallelSourceFunction<Integer>() {
                    //每个并行子任务都会触发run的调用
                    @Override
                    public void run(SourceContext<Integer> ctx) throws Exception {
                        for (int i = 1; i < 9; i++) {
                            ctx.collect(i);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                })
                .setParallelism(2)
                .print().setParallelism(2);

        env.execute();
    }
}
